/**
 * 
 */
package org.swing.utility.common.collection;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;

import org.swing.utility.common.clazz.CloserUtil;
import org.swing.utility.common.exception.RuntimeIOException;
import org.swing.utility.common.file.IOUtil;

/**
 * @author lqnhu
 *
 */
public class SortingLongCollection {
	public static final int SIZEOF = 8;
	public static final int MAX_ITEMS_IN_RAM = (int) Math
			.floor((Integer.MAX_VALUE / 8) * .999);
	/**
	 * Where files of sorted values go.
	 */
	private final File[] tmpDir;
	private final int maxValuesInRam;
	private int numValuesInRam = 0;
	private long[] ramValues;
	/**
	 * Set to true when done adding and ready to iterate
	 */
	private boolean doneAdding = false;
	/**
	 * Set to true when all temp files have been cleaned up
	 */
	private boolean cleanedUp = false;
	/**
	 * List of files in tmpDir containing sorted values
	 */
	private final List<File> files = new ArrayList<File>();
	// for in-memory iteration
	private int iterationIndex = 0;
	// For disk-based iteration
	private PriorityQueue<PeekFileValueIterator> priorityQueue;

	/**
	 * Prepare to accumulate values to be sorted
	 *
	 * @param maxValuesInRam
	 *            how many values to accumulate before spilling to disk
	 * @param tmpDir
	 *            Where to write files of values that will not fit in RAM
	 */
	public SortingLongCollection(final int maxValuesInRam, final File... tmpDir) {
		if (maxValuesInRam <= 0) {
			throw new IllegalArgumentException("maxValuesInRam must be > 0");
		}
		this.tmpDir = tmpDir;
		this.maxValuesInRam = Math.min(maxValuesInRam, MAX_ITEMS_IN_RAM);
		this.ramValues = new long[maxValuesInRam];
	}

	/**
	 * Add a value to the collection.
	 *
	 * @param value
	 */
	public void add(final long value) {
		if (doneAdding) {
			throw new IllegalStateException(
					"Cannot add after calling doneAddingStartIteration()");
		}
		if (numValuesInRam == maxValuesInRam) {
			spillToDisk();
		}
		ramValues[numValuesInRam++] = value;
	}

	/**
	 * This method must be called after done adding, and before calling
	 * hasNext() or next().
	 */
	public void doneAddingStartIteration() {
		if (cleanedUp || doneAdding) {
			throw new IllegalStateException(
					"Cannot call doneAddingStartIteration() after cleanup() was called.");
		}
		doneAdding = true;
		if (this.files.isEmpty()) {
			Arrays.sort(this.ramValues, 0, this.numValuesInRam);
			return;
		}
		if (this.numValuesInRam > 0) {
			spillToDisk();
		}
		this.priorityQueue = new PriorityQueue<PeekFileValueIterator>(
				files.size(), new PeekFileValueIteratorComparator());
		for (final File f : files) {
			final FileValueIterator it = new FileValueIterator(f);
			if (it.hasNext()) {
				this.priorityQueue.offer(new PeekFileValueIterator(it));
			}
		}
		// Facilitate GC
		this.ramValues = null;
	}

	/**
	 * Sort the values in memory, write them to a file, and clear the buffer of
	 * values in memory.
	 */
	private void spillToDisk() {
		try {
			Arrays.sort(this.ramValues, 0, this.numValuesInRam);
			final File f = IOUtil.newTempFile("sortingcollection.", ".tmp",
					this.tmpDir, IOUtil.FIVE_GBS);
			DataOutputStream os = null;
			try {
				final long numBytes = this.numValuesInRam * SIZEOF;
				os = new DataOutputStream(
						IOUtil.maybeBufferOutputStream(new FileOutputStream(f)));
				f.deleteOnExit();
				for (int i = 0; i < this.numValuesInRam; ++i) {
					os.writeLong(ramValues[i]);
				}
				os.flush();
			} finally {
				if (os != null) {
					os.close();
				}
			}
			this.numValuesInRam = 0;
			this.files.add(f);
		} catch (IOException e) {
			throw new RuntimeIOException(e);
		}
	}

	/**
	 * Delete any temporary files. After this method is called, no other method
	 * calls should be made on this object.
	 */
	public void cleanup() {
		this.doneAdding = true;
		this.cleanedUp = true;
		this.ramValues = null;
		IOUtil.deleteFiles(this.files);
	}

	/**
	 * Call only after doneAddingStartIteration() has been called.
	 *
	 * @return true if there is another value to be gotten.
	 */
	public boolean hasNext() {
		if (!doneAdding || cleanedUp) {
			throw new IllegalStateException();
		}
		if (this.ramValues != null) {
			// in-memory iteration
			return this.iterationIndex < numValuesInRam;
		} else {
			return !priorityQueue.isEmpty();
		}
	}

	/**
	 * Call only if hasNext() == true.
	 *
	 * @return next value from collection, in natural sort order.
	 */
	public long next() {
		if (!hasNext()) {
			throw new NoSuchElementException();
		}
		if (this.ramValues != null) {
			// in-memory iteration
			return ramValues[iterationIndex++];
		} else {
			final PeekFileValueIterator fileIterator = priorityQueue.poll();
			final long ret = fileIterator.next();
			if (fileIterator.hasNext()) {
				this.priorityQueue.offer(fileIterator);
			} else {
				fileIterator.close();
			}
			return ret;
		}
	}

	/**
	 * Read a file of longs
	 */
	private static class FileValueIterator {
		private final File file;
		private final DataInputStream is;
		private long currentRecord = 0;
		private boolean isCurrentRecord = true;

		FileValueIterator(final File file) {
			this.file = file;
			try {
				is = new DataInputStream(
						IOUtil.maybeBufferInputStream(new FileInputStream(file)));
				next();
			} catch (FileNotFoundException e) {
				throw new RuntimeIOException(file.getAbsolutePath(), e);
			}
		}

		boolean hasNext() {
			return isCurrentRecord;
		}

		long next() {
			if (!hasNext()) {
				throw new NoSuchElementException();
			}
			final long ret = currentRecord;
			try {
				currentRecord = is.readLong();
			} catch (final EOFException eof) {
				isCurrentRecord = false;
				currentRecord = 0;
			} catch (final IOException e) {
				throw new RuntimeException(e);
			}
			return ret;
		}

		void close() {
			CloserUtil.close(is);
		}
	}

	/**
	 * Add peek() functionality to FileValueIterator
	 */
	private static class PeekFileValueIterator {
		private FileValueIterator underlyingIterator;
		private long peekValue;
		private boolean hasPeekedValue = false;

		PeekFileValueIterator(final FileValueIterator underlyingIterator) {
			this.underlyingIterator = underlyingIterator;
		}

		boolean hasNext() {
			return hasPeekedValue || underlyingIterator.hasNext();
		}

		long next() {
			if (!hasNext()) {
				throw new NoSuchElementException();
			}
			if (hasPeekedValue) {
				hasPeekedValue = false;
				return peekValue;
			}
			return underlyingIterator.next();
		}

		long peek() {
			if (!hasNext()) {
				throw new NoSuchElementException();
			}
			if (!hasPeekedValue) {
				peekValue = underlyingIterator.next();
				hasPeekedValue = true;
			}
			return peekValue;
		}

		void close() {
			underlyingIterator.close();
			hasPeekedValue = false;
			underlyingIterator = null;
		}
	}

	private static class PeekFileValueIteratorComparator implements
			Comparator<PeekFileValueIterator> {
		public int compare(final PeekFileValueIterator it1,
				final PeekFileValueIterator it2) {
			if (it1.peek() < it2.peek()) {
				return -1;
			}
			if (it1.peek() == it2.peek()) {
				return 0;
			}
			return 1;
		}
	}
}
